package com.github.mall.common.kafka.consumer.impl;

import cn.hutool.json.JSONUtil;
import com.github.mall.common.entity.SysLogEntity;
import com.github.mall.common.kafka.consumer.IKafkaConsumer;
import com.github.mall.common.kafka.entity.Message;
import com.github.mall.common.services.SysLogService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
@AllArgsConstructor
@Slf4j
public class SysLogKafkaConsumer implements IKafkaConsumer {

    private final SysLogService sysLogService;

    @Override
    @KafkaListener(topics = {"sys_log_topic"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Message message = JSONUtil.toBean(JSONUtil.parseObj(kafkaMessage.get()) ,Message.class);
            sysLogService.save( JSONUtil.toBean(JSONUtil.parseObj(message.getData()) ,SysLogEntity.class));
            if(log.isDebugEnabled()){
                log.debug("日志保存成功");
            }
        }
    }
}
